home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-9.10-netbook-remix-PL.iso / casper / filesystem.squashfs / usr / lib / python2.6 / multiprocessing / managers.py < prev    next >
Text File  |  2009-11-02  |  35KB  |  1,084 lines

  1. #
  2. # Module providing the `SyncManager` class for dealing
  3. # with shared objects
  4. #
  5. # multiprocessing/managers.py
  6. #
  7. # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
  8. #
  9.  
  10. __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
  11.  
  12. #
  13. # Imports
  14. #
  15.  
  16. import os
  17. import sys
  18. import weakref
  19. import threading
  20. import array
  21. import Queue
  22.  
  23. from traceback import format_exc
  24. from multiprocessing import Process, current_process, active_children, Pool, util, connection
  25. from multiprocessing.process import AuthenticationString
  26. from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
  27. from multiprocessing.util import Finalize, info
  28.  
  29. try:
  30.     from cPickle import PicklingError
  31. except ImportError:
  32.     from pickle import PicklingError
  33.  
  34. #
  35. # Register some things for pickling
  36. #
  37.  
  38. def reduce_array(a):
  39.     return array.array, (a.typecode, a.tostring())
  40. ForkingPickler.register(array.array, reduce_array)
  41.  
  42. view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
  43.  
  44. #
  45. # Type for identifying shared objects
  46. #
  47.  
  48. class Token(object):
  49.     '''
  50.     Type to uniquely indentify a shared object
  51.     '''
  52.     __slots__ = ('typeid', 'address', 'id')
  53.  
  54.     def __init__(self, typeid, address, id):
  55.         (self.typeid, self.address, self.id) = (typeid, address, id)
  56.  
  57.     def __getstate__(self):
  58.         return (self.typeid, self.address, self.id)
  59.  
  60.     def __setstate__(self, state):
  61.         (self.typeid, self.address, self.id) = state
  62.  
  63.     def __repr__(self):
  64.         return 'Token(typeid=%r, address=%r, id=%r)' % \
  65.                (self.typeid, self.address, self.id)
  66.  
  67. #
  68. # Function for communication with a manager's server process
  69. #
  70.  
  71. def dispatch(c, id, methodname, args=(), kwds={}):
  72.     '''
  73.     Send a message to manager using connection `c` and return response
  74.     '''
  75.     c.send((id, methodname, args, kwds))
  76.     kind, result = c.recv()
  77.     if kind == '#RETURN':
  78.         return result
  79.     raise convert_to_error(kind, result)
  80.  
  81. def convert_to_error(kind, result):
  82.     if kind == '#ERROR':
  83.         return result
  84.     elif kind == '#TRACEBACK':
  85.         assert type(result) is str
  86.         return  RemoteError(result)
  87.     elif kind == '#UNSERIALIZABLE':
  88.         assert type(result) is str
  89.         return RemoteError('Unserializable message: %s\n' % result)
  90.     else:
  91.         return ValueError('Unrecognized message type')
  92.  
  93. class RemoteError(Exception):
  94.     def __str__(self):
  95.         return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
  96.  
  97. #
  98. # Functions for finding the method names of an object
  99. #
  100.  
  101. def all_methods(obj):
  102.     '''
  103.     Return a list of names of methods of `obj`
  104.     '''
  105.     temp = []
  106.     for name in dir(obj):
  107.         func = getattr(obj, name)
  108.         if hasattr(func, '__call__'):
  109.             temp.append(name)
  110.     return temp
  111.  
  112. def public_methods(obj):
  113.     '''
  114.     Return a list of names of methods of `obj` which do not start with '_'
  115.     '''
  116.     return [name for name in all_methods(obj) if name[0] != '_']
  117.  
  118. #
  119. # Server which is run in a process controlled by a manager
  120. #
  121.  
  122. class Server(object):
  123.     '''
  124.     Server class which runs in a process controlled by a manager object
  125.     '''
  126.     public = ['shutdown', 'create', 'accept_connection', 'get_methods',
  127.               'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
  128.  
  129.     def __init__(self, registry, address, authkey, serializer):
  130.         assert isinstance(authkey, bytes)
  131.         self.registry = registry
  132.         self.authkey = AuthenticationString(authkey)
  133.         Listener, Client = listener_client[serializer]
  134.  
  135.         # do authentication later
  136.         self.listener = Listener(address=address, backlog=5)
  137.         self.address = self.listener.address
  138.  
  139.         self.id_to_obj = {0: (None, ())}
  140.         self.id_to_refcount = {}
  141.         self.mutex = threading.RLock()
  142.         self.stop = 0
  143.  
  144.     def serve_forever(self):
  145.         '''
  146.         Run the server forever
  147.         '''
  148.         current_process()._manager_server = self
  149.         try:
  150.             try:
  151.                 while 1:
  152.                     try:
  153.                         c = self.listener.accept()
  154.                     except (OSError, IOError):
  155.                         continue
  156.                     t = threading.Thread(target=self.handle_request, args=(c,))
  157.                     t.daemon = True
  158.                     t.start()
  159.             except (KeyboardInterrupt, SystemExit):
  160.                 pass
  161.         finally:
  162.             self.stop = 999
  163.             self.listener.close()
  164.  
  165.     def handle_request(self, c):
  166.         '''
  167.         Handle a new connection
  168.         '''
  169.         funcname = result = request = None
  170.         try:
  171.             connection.deliver_challenge(c, self.authkey)
  172.             connection.answer_challenge(c, self.authkey)
  173.             request = c.recv()
  174.             ignore, funcname, args, kwds = request
  175.             assert funcname in self.public, '%r unrecognized' % funcname
  176.             func = getattr(self, funcname)
  177.         except Exception:
  178.             msg = ('#TRACEBACK', format_exc())
  179.         else:
  180.             try:
  181.                 result = func(c, *args, **kwds)
  182.             except Exception:
  183.                 msg = ('#TRACEBACK', format_exc())
  184.             else:
  185.                 msg = ('#RETURN', result)
  186.         try:
  187.             c.send(msg)
  188.         except Exception, e:
  189.             try:
  190.                 c.send(('#TRACEBACK', format_exc()))
  191.             except Exception:
  192.                 pass
  193.             util.info('Failure to send message: %r', msg)
  194.             util.info(' ... request was %r', request)
  195.             util.info(' ... exception was %r', e)
  196.  
  197.         c.close()
  198.  
  199.     def serve_client(self, conn):
  200.         '''
  201.         Handle requests from the proxies in a particular process/thread
  202.         '''
  203.         util.debug('starting server thread to service %r',
  204.                    threading.current_thread().name)
  205.  
  206.         recv = conn.recv
  207.         send = conn.send
  208.         id_to_obj = self.id_to_obj
  209.  
  210.         while not self.stop:
  211.  
  212.             try:
  213.                 methodname = obj = None
  214.                 request = recv()
  215.                 ident, methodname, args, kwds = request
  216.                 obj, exposed, gettypeid = id_to_obj[ident]
  217.  
  218.                 if methodname not in exposed:
  219.                     raise AttributeError(
  220.                         'method %r of %r object is not in exposed=%r' %
  221.                         (methodname, type(obj), exposed)
  222.                         )
  223.  
  224.                 function = getattr(obj, methodname)
  225.  
  226.                 try:
  227.                     res = function(*args, **kwds)
  228.                 except Exception, e:
  229.                     msg = ('#ERROR', e)
  230.                 else:
  231.                     typeid = gettypeid and gettypeid.get(methodname, None)
  232.                     if typeid:
  233.                         rident, rexposed = self.create(conn, typeid, res)
  234.                         token = Token(typeid, self.address, rident)
  235.                         msg = ('#PROXY', (rexposed, token))
  236.                     else:
  237.                         msg = ('#RETURN', res)
  238.  
  239.             except AttributeError:
  240.                 if methodname is None:
  241.                     msg = ('#TRACEBACK', format_exc())
  242.                 else:
  243.                     try:
  244.                         fallback_func = self.fallback_mapping[methodname]
  245.                         result = fallback_func(
  246.                             self, conn, ident, obj, *args, **kwds
  247.                             )
  248.                         msg = ('#RETURN', result)
  249.                     except Exception:
  250.                         msg = ('#TRACEBACK', format_exc())
  251.  
  252.             except EOFError:
  253.                 util.debug('got EOF -- exiting thread serving %r',
  254.                            threading.current_thread().name)
  255.                 sys.exit(0)
  256.  
  257.             except Exception:
  258.                 msg = ('#TRACEBACK', format_exc())
  259.  
  260.             try:
  261.                 try:
  262.                     send(msg)
  263.                 except Exception, e:
  264.                     send(('#UNSERIALIZABLE', repr(msg)))
  265.             except Exception, e:
  266.                 util.info('exception in thread serving %r',
  267.                         threading.current_thread().name)
  268.                 util.info(' ... message was %r', msg)
  269.                 util.info(' ... exception was %r', e)
  270.                 conn.close()
  271.                 sys.exit(1)
  272.  
  273.     def fallback_getvalue(self, conn, ident, obj):
  274.         return obj
  275.  
  276.     def fallback_str(self, conn, ident, obj):
  277.         return str(obj)
  278.  
  279.     def fallback_repr(self, conn, ident, obj):
  280.         return repr(obj)
  281.  
  282.     fallback_mapping = {
  283.         '__str__':fallback_str,
  284.         '__repr__':fallback_repr,
  285.         '#GETVALUE':fallback_getvalue
  286.         }
  287.  
  288.     def dummy(self, c):
  289.         pass
  290.  
  291.     def debug_info(self, c):
  292.         '''
  293.         Return some info --- useful to spot problems with refcounting
  294.         '''
  295.         self.mutex.acquire()
  296.         try:
  297.             result = []
  298.             keys = self.id_to_obj.keys()
  299.             keys.sort()
  300.             for ident in keys:
  301.                 if ident != 0:
  302.                     result.append('  %s:       refcount=%s\n    %s' %
  303.                                   (ident, self.id_to_refcount[ident],
  304.                                    str(self.id_to_obj[ident][0])[:75]))
  305.             return '\n'.join(result)
  306.         finally:
  307.             self.mutex.release()
  308.  
  309.     def number_of_objects(self, c):
  310.         '''
  311.         Number of shared objects
  312.         '''
  313.         return len(self.id_to_obj) - 1      # don't count ident=0
  314.  
  315.     def shutdown(self, c):
  316.         '''
  317.         Shutdown this process
  318.         '''
  319.         try:
  320.             try:
  321.                 util.debug('manager received shutdown message')
  322.                 c.send(('#RETURN', None))
  323.  
  324.                 if sys.stdout != sys.__stdout__:
  325.                     util.debug('resetting stdout, stderr')
  326.                     sys.stdout = sys.__stdout__
  327.                     sys.stderr = sys.__stderr__
  328.  
  329.                 util._run_finalizers(0)
  330.  
  331.                 for p in active_children():
  332.                     util.debug('terminating a child process of manager')
  333.                     p.terminate()
  334.  
  335.                 for p in active_children():
  336.                     util.debug('terminating a child process of manager')
  337.                     p.join()
  338.  
  339.                 util._run_finalizers()
  340.                 util.info('manager exiting with exitcode 0')
  341.             except:
  342.                 import traceback
  343.                 traceback.print_exc()
  344.         finally:
  345.             exit(0)
  346.  
  347.     def create(self, c, typeid, *args, **kwds):
  348.         '''
  349.         Create a new shared object and return its id
  350.         '''
  351.         self.mutex.acquire()
  352.         try:
  353.             callable, exposed, method_to_typeid, proxytype = \
  354.                       self.registry[typeid]
  355.  
  356.             if callable is None:
  357.                 assert len(args) == 1 and not kwds
  358.                 obj = args[0]
  359.             else:
  360.                 obj = callable(*args, **kwds)
  361.  
  362.             if exposed is None:
  363.                 exposed = public_methods(obj)
  364.             if method_to_typeid is not None:
  365.                 assert type(method_to_typeid) is dict
  366.                 exposed = list(exposed) + list(method_to_typeid)
  367.  
  368.             ident = '%x' % id(obj)  # convert to string because xmlrpclib
  369.                                     # only has 32 bit signed integers
  370.             util.debug('%r callable returned object with id %r', typeid, ident)
  371.  
  372.             self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
  373.             if ident not in self.id_to_refcount:
  374.                 self.id_to_refcount[ident] = 0
  375.             # increment the reference count immediately, to avoid
  376.             # this object being garbage collected before a Proxy
  377.             # object for it can be created.  The caller of create()
  378.             # is responsible for doing a decref once the Proxy object
  379.             # has been created.
  380.             self.incref(c, ident)
  381.             return ident, tuple(exposed)
  382.         finally:
  383.             self.mutex.release()
  384.  
  385.     def get_methods(self, c, token):
  386.         '''
  387.         Return the methods of the shared object indicated by token
  388.         '''
  389.         return tuple(self.id_to_obj[token.id][1])
  390.  
  391.     def accept_connection(self, c, name):
  392.         '''
  393.         Spawn a new thread to serve this connection
  394.         '''
  395.         threading.current_thread().name = name
  396.         c.send(('#RETURN', None))
  397.         self.serve_client(c)
  398.  
  399.     def incref(self, c, ident):
  400.         self.mutex.acquire()
  401.         try:
  402.             self.id_to_refcount[ident] += 1
  403.         finally:
  404.             self.mutex.release()
  405.  
  406.     def decref(self, c, ident):
  407.         self.mutex.acquire()
  408.         try:
  409.             assert self.id_to_refcount[ident] >= 1
  410.             self.id_to_refcount[ident] -= 1
  411.             if self.id_to_refcount[ident] == 0:
  412.                 del self.id_to_obj[ident], self.id_to_refcount[ident]
  413.                 util.debug('disposing of obj with id %r', ident)
  414.         finally:
  415.             self.mutex.release()
  416.  
  417. #
  418. # Class to represent state of a manager
  419. #
  420.  
  421. class State(object):
  422.     __slots__ = ['value']
  423.     INITIAL = 0
  424.     STARTED = 1
  425.     SHUTDOWN = 2
  426.  
  427. #
  428. # Mapping from serializer name to Listener and Client types
  429. #
  430.  
  431. listener_client = {
  432.     'pickle' : (connection.Listener, connection.Client),
  433.     'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
  434.     }
  435.  
  436. #
  437. # Definition of BaseManager
  438. #
  439.  
  440. class BaseManager(object):
  441.     '''
  442.     Base class for managers
  443.     '''
  444.     _registry = {}
  445.     _Server = Server
  446.  
  447.     def __init__(self, address=None, authkey=None, serializer='pickle'):
  448.         if authkey is None:
  449.             authkey = current_process().authkey
  450.         self._address = address     # XXX not final address if eg ('', 0)
  451.         self._authkey = AuthenticationString(authkey)
  452.         self._state = State()
  453.         self._state.value = State.INITIAL
  454.         self._serializer = serializer
  455.         self._Listener, self._Client = listener_client[serializer]
  456.  
  457.     def __reduce__(self):
  458.         return type(self).from_address, \
  459.                (self._address, self._authkey, self._serializer)
  460.  
  461.     def get_server(self):
  462.         '''
  463.         Return server object with serve_forever() method and address attribute
  464.         '''
  465.         assert self._state.value == State.INITIAL
  466.         return Server(self._registry, self._address,
  467.                       self._authkey, self._serializer)
  468.  
  469.     def connect(self):
  470.         '''
  471.         Connect manager object to the server process
  472.         '''
  473.         Listener, Client = listener_client[self._serializer]
  474.         conn = Client(self._address, authkey=self._authkey)
  475.         dispatch(conn, None, 'dummy')
  476.         self._state.value = State.STARTED
  477.  
  478.     def start(self):
  479.         '''
  480.         Spawn a server process for this manager object
  481.         '''
  482.         assert self._state.value == State.INITIAL
  483.  
  484.         # pipe over which we will retrieve address of server
  485.         reader, writer = connection.Pipe(duplex=False)
  486.  
  487.         # spawn process which runs a server
  488.         self._process = Process(
  489.             target=type(self)._run_server,
  490.             args=(self._registry, self._address, self._authkey,
  491.                   self._serializer, writer),
  492.             )
  493.         ident = ':'.join(str(i) for i in self._process._identity)
  494.         self._process.name = type(self).__name__  + '-' + ident
  495.         self._process.start()
  496.  
  497.         # get address of server
  498.         writer.close()
  499.         self._address = reader.recv()
  500.         reader.close()
  501.  
  502.         # register a finalizer
  503.         self._state.value = State.STARTED
  504.         self.shutdown = util.Finalize(
  505.             self, type(self)._finalize_manager,
  506.             args=(self._process, self._address, self._authkey,
  507.                   self._state, self._Client),
  508.             exitpriority=0
  509.             )
  510.  
  511.     @classmethod
  512.     def _run_server(cls, registry, address, authkey, serializer, writer):
  513.         '''
  514.         Create a server, report its address and run it
  515.         '''
  516.         # create server
  517.         server = cls._Server(registry, address, authkey, serializer)
  518.  
  519.         # inform parent process of the server's address
  520.         writer.send(server.address)
  521.         writer.close()
  522.  
  523.         # run the manager
  524.         util.info('manager serving at %r', server.address)
  525.         server.serve_forever()
  526.  
  527.     def _create(self, typeid, *args, **kwds):
  528.         '''
  529.         Create a new shared object; return the token and exposed tuple
  530.         '''
  531.         assert self._state.value == State.STARTED, 'server not yet started'
  532.         conn = self._Client(self._address, authkey=self._authkey)
  533.         try:
  534.             id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
  535.         finally:
  536.             conn.close()
  537.         return Token(typeid, self._address, id), exposed
  538.  
  539.     def join(self, timeout=None):
  540.         '''
  541.         Join the manager process (if it has been spawned)
  542.         '''
  543.         self._process.join(timeout)
  544.  
  545.     def _debug_info(self):
  546.         '''
  547.         Return some info about the servers shared objects and connections
  548.         '''
  549.         conn = self._Client(self._address, authkey=self._authkey)
  550.         try:
  551.             return dispatch(conn, None, 'debug_info')
  552.         finally:
  553.             conn.close()
  554.  
  555.     def _number_of_objects(self):
  556.         '''
  557.         Return the number of shared objects
  558.         '''
  559.         conn = self._Client(self._address, authkey=self._authkey)
  560.         try:
  561.             return dispatch(conn, None, 'number_of_objects')
  562.         finally:
  563.             conn.close()
  564.  
  565.     def __enter__(self):
  566.         return self
  567.  
  568.     def __exit__(self, exc_type, exc_val, exc_tb):
  569.         self.shutdown()
  570.  
  571.     @staticmethod
  572.     def _finalize_manager(process, address, authkey, state, _Client):
  573.         '''
  574.         Shutdown the manager process; will be registered as a finalizer
  575.         '''
  576.         if process.is_alive():
  577.             util.info('sending shutdown message to manager')
  578.             try:
  579.                 conn = _Client(address, authkey=authkey)
  580.                 try:
  581.                     dispatch(conn, None, 'shutdown')
  582.                 finally:
  583.                     conn.close()
  584.             except Exception:
  585.                 pass
  586.  
  587.             process.join(timeout=0.2)
  588.             if process.is_alive():
  589.                 util.info('manager still alive')
  590.                 if hasattr(process, 'terminate'):
  591.                     util.info('trying to `terminate()` manager process')
  592.                     process.terminate()
  593.                     process.join(timeout=0.1)
  594.                     if process.is_alive():
  595.                         util.info('manager still alive after terminate')
  596.  
  597.         state.value = State.SHUTDOWN
  598.         try:
  599.             del BaseProxy._address_to_local[address]
  600.         except KeyError:
  601.             pass
  602.  
  603.     address = property(lambda self: self._address)
  604.  
  605.     @classmethod
  606.     def register(cls, typeid, callable=None, proxytype=None, exposed=None,
  607.                  method_to_typeid=None, create_method=True):
  608.         '''
  609.         Register a typeid with the manager type
  610.         '''
  611.         if '_registry' not in cls.__dict__:
  612.             cls._registry = cls._registry.copy()
  613.  
  614.         if proxytype is None:
  615.             proxytype = AutoProxy
  616.  
  617.         exposed = exposed or getattr(proxytype, '_exposed_', None)
  618.  
  619.         method_to_typeid = method_to_typeid or \
  620.                            getattr(proxytype, '_method_to_typeid_', None)
  621.  
  622.         if method_to_typeid:
  623.             for key, value in method_to_typeid.items():
  624.                 assert type(key) is str, '%r is not a string' % key
  625.                 assert type(value) is str, '%r is not a string' % value
  626.  
  627.         cls._registry[typeid] = (
  628.             callable, exposed, method_to_typeid, proxytype
  629.             )
  630.  
  631.         if create_method:
  632.             def temp(self, *args, **kwds):
  633.                 util.debug('requesting creation of a shared %r object', typeid)
  634.                 token, exp = self._create(typeid, *args, **kwds)
  635.                 proxy = proxytype(
  636.                     token, self._serializer, manager=self,
  637.                     authkey=self._authkey, exposed=exp
  638.                     )
  639.                 conn = self._Client(token.address, authkey=self._authkey)
  640.                 dispatch(conn, None, 'decref', (token.id,))
  641.                 return proxy
  642.             temp.__name__ = typeid
  643.             setattr(cls, typeid, temp)
  644.  
  645. #
  646. # Subclass of set which get cleared after a fork
  647. #
  648.  
  649. class ProcessLocalSet(set):
  650.     def __init__(self):
  651.         util.register_after_fork(self, lambda obj: obj.clear())
  652.     def __reduce__(self):
  653.         return type(self), ()
  654.  
  655. #
  656. # Definition of BaseProxy
  657. #
  658.  
  659. class BaseProxy(object):
  660.     '''
  661.     A base for proxies of shared objects
  662.     '''
  663.     _address_to_local = {}
  664.     _mutex = util.ForkAwareThreadLock()
  665.  
  666.     def __init__(self, token, serializer, manager=None,
  667.                  authkey=None, exposed=None, incref=True):
  668.         BaseProxy._mutex.acquire()
  669.         try:
  670.             tls_idset = BaseProxy._address_to_local.get(token.address, None)
  671.             if tls_idset is None:
  672.                 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
  673.                 BaseProxy._address_to_local[token.address] = tls_idset
  674.         finally:
  675.             BaseProxy._mutex.release()
  676.  
  677.         # self._tls is used to record the connection used by this
  678.         # thread to communicate with the manager at token.address
  679.         self._tls = tls_idset[0]
  680.  
  681.         # self._idset is used to record the identities of all shared
  682.         # objects for which the current process owns references and
  683.         # which are in the manager at token.address
  684.         self._idset = tls_idset[1]
  685.  
  686.         self._token = token
  687.         self._id = self._token.id
  688.         self._manager = manager
  689.         self._serializer = serializer
  690.         self._Client = listener_client[serializer][1]
  691.  
  692.         if authkey is not None:
  693.             self._authkey = AuthenticationString(authkey)
  694.         elif self._manager is not None:
  695.             self._authkey = self._manager._authkey
  696.         else:
  697.             self._authkey = current_process().authkey
  698.  
  699.         if incref:
  700.             self._incref()
  701.  
  702.         util.register_after_fork(self, BaseProxy._after_fork)
  703.  
  704.     def _connect(self):
  705.         util.debug('making connection to manager')
  706.         name = current_process().name
  707.         if threading.current_thread().name != 'MainThread':
  708.             name += '|' + threading.current_thread().name
  709.         conn = self._Client(self._token.address, authkey=self._authkey)
  710.         dispatch(conn, None, 'accept_connection', (name,))
  711.         self._tls.connection = conn
  712.  
  713.     def _callmethod(self, methodname, args=(), kwds={}):
  714.         '''
  715.         Try to call a method of the referrent and return a copy of the result
  716.         '''
  717.         try:
  718.             conn = self._tls.connection
  719.         except AttributeError:
  720.             util.debug('thread %r does not own a connection',
  721.                        threading.current_thread().name)
  722.             self._connect()
  723.             conn = self._tls.connection
  724.  
  725.         conn.send((self._id, methodname, args, kwds))
  726.         kind, result = conn.recv()
  727.  
  728.         if kind == '#RETURN':
  729.             return result
  730.         elif kind == '#PROXY':
  731.             exposed, token = result
  732.             proxytype = self._manager._registry[token.typeid][-1]
  733.             proxy = proxytype(
  734.                 token, self._serializer, manager=self._manager,
  735.                 authkey=self._authkey, exposed=exposed
  736.                 )
  737.             conn = self._Client(token.address, authkey=self._authkey)
  738.             dispatch(conn, None, 'decref', (token.id,))
  739.             return proxy
  740.         raise convert_to_error(kind, result)
  741.  
  742.     def _getvalue(self):
  743.         '''
  744.         Get a copy of the value of the referent
  745.         '''
  746.         return self._callmethod('#GETVALUE')
  747.  
  748.     def _incref(self):
  749.         conn = self._Client(self._token.address, authkey=self._authkey)
  750.         dispatch(conn, None, 'incref', (self._id,))
  751.         util.debug('INCREF %r', self._token.id)
  752.  
  753.         self._idset.add(self._id)
  754.  
  755.         state = self._manager and self._manager._state
  756.  
  757.         self._close = util.Finalize(
  758.             self, BaseProxy._decref,
  759.             args=(self._token, self._authkey, state,
  760.                   self._tls, self._idset, self._Client),
  761.             exitpriority=10
  762.             )
  763.  
  764.     @staticmethod
  765.     def _decref(token, authkey, state, tls, idset, _Client):
  766.         idset.discard(token.id)
  767.  
  768.         # check whether manager is still alive
  769.         if state is None or state.value == State.STARTED:
  770.             # tell manager this process no longer cares about referent
  771.             try:
  772.                 util.debug('DECREF %r', token.id)
  773.                 conn = _Client(token.address, authkey=authkey)
  774.                 dispatch(conn, None, 'decref', (token.id,))
  775.             except Exception, e:
  776.                 util.debug('... decref failed %s', e)
  777.  
  778.         else:
  779.             util.debug('DECREF %r -- manager already shutdown', token.id)
  780.  
  781.         # check whether we can close this thread's connection because
  782.         # the process owns no more references to objects for this manager
  783.         if not idset and hasattr(tls, 'connection'):
  784.             util.debug('thread %r has no more proxies so closing conn',
  785.                        threading.current_thread().name)
  786.             tls.connection.close()
  787.             del tls.connection
  788.  
  789.     def _after_fork(self):
  790.         self._manager = None
  791.         try:
  792.             self._incref()
  793.         except Exception, e:
  794.             # the proxy may just be for a manager which has shutdown
  795.             util.info('incref failed: %s' % e)
  796.  
  797.     def __reduce__(self):
  798.         kwds = {}
  799.         if Popen.thread_is_spawning():
  800.             kwds['authkey'] = self._authkey
  801.  
  802.         if getattr(self, '_isauto', False):
  803.             kwds['exposed'] = self._exposed_
  804.             return (RebuildProxy,
  805.                     (AutoProxy, self._token, self._serializer, kwds))
  806.         else:
  807.             return (RebuildProxy,
  808.                     (type(self), self._token, self._serializer, kwds))
  809.  
  810.     def __deepcopy__(self, memo):
  811.         return self._getvalue()
  812.  
  813.     def __repr__(self):
  814.         return '<%s object, typeid %r at %s>' % \
  815.                (type(self).__name__, self._token.typeid, '0x%x' % id(self))
  816.  
  817.     def __str__(self):
  818.         '''
  819.         Return representation of the referent (or a fall-back if that fails)
  820.         '''
  821.         try:
  822.             return self._callmethod('__repr__')
  823.         except Exception:
  824.             return repr(self)[:-1] + "; '__str__()' failed>"
  825.  
  826. #
  827. # Function used for unpickling
  828. #
  829.  
  830. def RebuildProxy(func, token, serializer, kwds):
  831.     '''
  832.     Function used for unpickling proxy objects.
  833.  
  834.     If possible the shared object is returned, or otherwise a proxy for it.
  835.     '''
  836.     server = getattr(current_process(), '_manager_server', None)
  837.  
  838.     if server and server.address == token.address:
  839.         return server.id_to_obj[token.id][0]
  840.     else:
  841.         incref = (
  842.             kwds.pop('incref', True) and
  843.             not getattr(current_process(), '_inheriting', False)
  844.             )
  845.         return func(token, serializer, incref=incref, **kwds)
  846.  
  847. #
  848. # Functions to create proxies and proxy types
  849. #
  850.  
  851. def MakeProxyType(name, exposed, _cache={}):
  852.     '''
  853.     Return an proxy type whose methods are given by `exposed`
  854.     '''
  855.     exposed = tuple(exposed)
  856.     try:
  857.         return _cache[(name, exposed)]
  858.     except KeyError:
  859.         pass
  860.  
  861.     dic = {}
  862.  
  863.     for meth in exposed:
  864.         exec '''def %s(self, *args, **kwds):
  865.         return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
  866.  
  867.     ProxyType = type(name, (BaseProxy,), dic)
  868.     ProxyType._exposed_ = exposed
  869.     _cache[(name, exposed)] = ProxyType
  870.     return ProxyType
  871.  
  872.  
  873. def AutoProxy(token, serializer, manager=None, authkey=None,
  874.               exposed=None, incref=True):
  875.     '''
  876.     Return an auto-proxy for `token`
  877.     '''
  878.     _Client = listener_client[serializer][1]
  879.  
  880.     if exposed is None:
  881.         conn = _Client(token.address, authkey=authkey)
  882.         try:
  883.             exposed = dispatch(conn, None, 'get_methods', (token,))
  884.         finally:
  885.             conn.close()
  886.  
  887.     if authkey is None and manager is not None:
  888.         authkey = manager._authkey
  889.     if authkey is None:
  890.         authkey = current_process().authkey
  891.  
  892.     ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
  893.     proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
  894.                       incref=incref)
  895.     proxy._isauto = True
  896.     return proxy
  897.  
  898. #
  899. # Types/callables which we will register with SyncManager
  900. #
  901.  
  902. class Namespace(object):
  903.     def __init__(self, **kwds):
  904.         self.__dict__.update(kwds)
  905.     def __repr__(self):
  906.         items = self.__dict__.items()
  907.         temp = []
  908.         for name, value in items:
  909.             if not name.startswith('_'):
  910.                 temp.append('%s=%r' % (name, value))
  911.         temp.sort()
  912.         return 'Namespace(%s)' % str.join(', ', temp)
  913.  
  914. class Value(object):
  915.     def __init__(self, typecode, value, lock=True):
  916.         self._typecode = typecode
  917.         self._value = value
  918.     def get(self):
  919.         return self._value
  920.     def set(self, value):
  921.         self._value = value
  922.     def __repr__(self):
  923.         return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
  924.     value = property(get, set)
  925.  
  926. def Array(typecode, sequence, lock=True):
  927.     return array.array(typecode, sequence)
  928.  
  929. #
  930. # Proxy types used by SyncManager
  931. #
  932.  
  933. class IteratorProxy(BaseProxy):
  934.     # XXX remove methods for Py3.0 and Py2.6
  935.     _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
  936.     def __iter__(self):
  937.         return self
  938.     def __next__(self, *args):
  939.         return self._callmethod('__next__', args)
  940.     def next(self, *args):
  941.         return self._callmethod('next', args)
  942.     def send(self, *args):
  943.         return self._callmethod('send', args)
  944.     def throw(self, *args):
  945.         return self._callmethod('throw', args)
  946.     def close(self, *args):
  947.         return self._callmethod('close', args)
  948.  
  949.  
  950. class AcquirerProxy(BaseProxy):
  951.     _exposed_ = ('acquire', 'release')
  952.     def acquire(self, blocking=True):
  953.         return self._callmethod('acquire', (blocking,))
  954.     def release(self):
  955.         return self._callmethod('release')
  956.     def __enter__(self):
  957.         return self._callmethod('acquire')
  958.     def __exit__(self, exc_type, exc_val, exc_tb):
  959.         return self._callmethod('release')
  960.  
  961.  
  962. class ConditionProxy(AcquirerProxy):
  963.     # XXX will Condition.notfyAll() name be available in Py3.0?
  964.     _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
  965.     def wait(self, timeout=None):
  966.         return self._callmethod('wait', (timeout,))
  967.     def notify(self):
  968.         return self._callmethod('notify')
  969.     def notify_all(self):
  970.         return self._callmethod('notify_all')
  971.  
  972. class EventProxy(BaseProxy):
  973.     _exposed_ = ('is_set', 'set', 'clear', 'wait')
  974.     def is_set(self):
  975.         return self._callmethod('is_set')
  976.     def set(self):
  977.         return self._callmethod('set')
  978.     def clear(self):
  979.         return self._callmethod('clear')
  980.     def wait(self, timeout=None):
  981.         return self._callmethod('wait', (timeout,))
  982.  
  983. class NamespaceProxy(BaseProxy):
  984.     _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
  985.     def __getattr__(self, key):
  986.         if key[0] == '_':
  987.             return object.__getattribute__(self, key)
  988.         callmethod = object.__getattribute__(self, '_callmethod')
  989.         return callmethod('__getattribute__', (key,))
  990.     def __setattr__(self, key, value):
  991.         if key[0] == '_':
  992.             return object.__setattr__(self, key, value)
  993.         callmethod = object.__getattribute__(self, '_callmethod')
  994.         return callmethod('__setattr__', (key, value))
  995.     def __delattr__(self, key):
  996.         if key[0] == '_':
  997.             return object.__delattr__(self, key)
  998.         callmethod = object.__getattribute__(self, '_callmethod')
  999.         return callmethod('__delattr__', (key,))
  1000.  
  1001.  
  1002. class ValueProxy(BaseProxy):
  1003.     _exposed_ = ('get', 'set')
  1004.     def get(self):
  1005.         return self._callmethod('get')
  1006.     def set(self, value):
  1007.         return self._callmethod('set', (value,))
  1008.     value = property(get, set)
  1009.  
  1010.  
  1011. BaseListProxy = MakeProxyType('BaseListProxy', (
  1012.     '__add__', '__contains__', '__delitem__', '__delslice__',
  1013.     '__getitem__', '__getslice__', '__len__', '__mul__',
  1014.     '__reversed__', '__rmul__', '__setitem__', '__setslice__',
  1015.     'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
  1016.     'reverse', 'sort', '__imul__'
  1017.     ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
  1018. class ListProxy(BaseListProxy):
  1019.     def __iadd__(self, value):
  1020.         self._callmethod('extend', (value,))
  1021.         return self
  1022.     def __imul__(self, value):
  1023.         self._callmethod('__imul__', (value,))
  1024.         return self
  1025.  
  1026.  
  1027. DictProxy = MakeProxyType('DictProxy', (
  1028.     '__contains__', '__delitem__', '__getitem__', '__len__',
  1029.     '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
  1030.     'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
  1031.     ))
  1032.  
  1033.  
  1034. ArrayProxy = MakeProxyType('ArrayProxy', (
  1035.     '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
  1036.     ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
  1037.  
  1038.  
  1039. PoolProxy = MakeProxyType('PoolProxy', (
  1040.     'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
  1041.     'map', 'map_async', 'terminate'
  1042.     ))
  1043. PoolProxy._method_to_typeid_ = {
  1044.     'apply_async': 'AsyncResult',
  1045.     'map_async': 'AsyncResult',
  1046.     'imap': 'Iterator',
  1047.     'imap_unordered': 'Iterator'
  1048.     }
  1049.  
  1050. #
  1051. # Definition of SyncManager
  1052. #
  1053.  
  1054. class SyncManager(BaseManager):
  1055.     '''
  1056.     Subclass of `BaseManager` which supports a number of shared object types.
  1057.  
  1058.     The types registered are those intended for the synchronization
  1059.     of threads, plus `dict`, `list` and `Namespace`.
  1060.  
  1061.     The `multiprocessing.Manager()` function creates started instances of
  1062.     this class.
  1063.     '''
  1064.  
  1065. SyncManager.register('Queue', Queue.Queue)
  1066. SyncManager.register('JoinableQueue', Queue.Queue)
  1067. SyncManager.register('Event', threading.Event, EventProxy)
  1068. SyncManager.register('Lock', threading.Lock, AcquirerProxy)
  1069. SyncManager.register('RLock', threading.RLock, AcquirerProxy)
  1070. SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
  1071. SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
  1072.                      AcquirerProxy)
  1073. SyncManager.register('Condition', threading.Condition, ConditionProxy)
  1074. SyncManager.register('Pool', Pool, PoolProxy)
  1075. SyncManager.register('list', list, ListProxy)
  1076. SyncManager.register('dict', dict, DictProxy)
  1077. SyncManager.register('Value', Value, ValueProxy)
  1078. SyncManager.register('Array', Array, ArrayProxy)
  1079. SyncManager.register('Namespace', Namespace, NamespaceProxy)
  1080.  
  1081. # types returned by methods of PoolProxy
  1082. SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
  1083. SyncManager.register('AsyncResult', create_method=False)
  1084.